-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Different async mechanism: MPI message buffer #111
Conversation
This looks great, very well thought out. I also like the iteration to avoid 0MQ's async API, the simpler we make it the better. Re. slowing down the fast path, it's probably much of a muchness, given that this implementation is much nicer (and actually works), I wouldn't let that stop us. We can do some benchmarking once it's done as well. |
src/scheduler/MpiWorld.cpp
Outdated
functionCallClients; | ||
/* Each MPI rank runs in a separate thread, however they interact with faabric | ||
* as a library. Thus, we use thread_local storage to guarantee that each rank | ||
* sees its own version of these data structures. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand this comment, what do you mean "they interact with faabric as a library"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really the comment is too verbose and poorly phrased, will rephrase.
src/scheduler/MpiWorld.cpp
Outdated
assert(index >= 0 && index < size * size); | ||
|
||
// Lazily initialise send endpoints | ||
if (mpiMessageEndpoints[index] == nullptr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will these definitely be nullptr
when uninitialised? If the condition for being uninitialised is that they are nullptr
, I would usually explicitly set that in the constructor, however, it might be overkill and I'm not sure what the spec says.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Currently going through the comments in #105, but this applies as well)
And yes, this will definately be uninitialised as we explicitely emplace_back
a nullptr in line 45.
for (int i = 0; i < size * size; i++) { | ||
unackedMessageBuffers.emplace_back(nullptr); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do unackedMessageBuffers = std::vector(size * size, nullptr)
rather than a loop here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case yes as we are using shared pointers. However, I will use resize
as I think it fits better.
This is, unackedMessageBuffers.resize(size * size, nullptr)
.
include/faabric/scheduler/MpiWorld.h
Outdated
faabric_datatype_t* dataType; | ||
int count; | ||
faabric::MPIMessage::MPIMessageType messageType; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we put the shared pointer to the message into this struct, could we get rid of one of the std::lists
in this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in fact after some deep refactoring I realized we could just as well use one single list (which makes iterator management much easier).
Ah sorry I got confused and reviewed this when you had actually requested a review on the other one 🤦 |
@@ -366,4 +362,187 @@ TEST_CASE_METHOD(RemoteCollectiveTestFixture, | |||
senderThread.join(); | |||
localWorld.destroy(); | |||
} | |||
|
|||
TEST_CASE_METHOD(RemoteMpiTestFixture, | |||
"Test sending sync and async message to same host", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the test that exposed the flaws in the thread pool (and would make the current master fail).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, this is looking good, just a few style/ structure changes.
src/scheduler/MpiWorld.cpp
Outdated
SPDLOG_TRACE("MPI - pending recv {} -> {}", sendRank, recvRank); | ||
auto _m = getLocalQueue(sendRank, recvRank)->dequeue(); | ||
|
||
assert(_m != nullptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry should have been clearer. I think it would be more useful to put the check in the enqueue
method, as that will catch the issue at the source of the problem (i.e. when it's enqueued) rather than here.
} | ||
} | ||
unackedMessageBuffers.clear(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to add a couple of small tests to make sure these destroy
checks work, e.g. call isend
a few times then call destroy
and REQUIRE_THROWS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing out, as I had actually missed checking if iSendRequest
was empty at destruction time.
@@ -212,15 +205,36 @@ class MpiWorld | |||
void initLocalQueues(); | |||
|
|||
// Rank-to-rank sockets for remote messaging | |||
void initRemoteMpiEndpoint(int sendRank, int recvRank); | |||
int getMpiPort(int sendRank, int recvRank); | |||
std::vector<int> basePorts; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed these to accomodate for the new port offset per world.
a3287fd
to
bee0ef8
Compare
@@ -261,16 +262,51 @@ std::string MpiWorld::getHostForRank(int rank) | |||
return host; | |||
} | |||
|
|||
// Returns a pair (sendPort, recvPort) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also worth checking this which has changed since last review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(basically this commit)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, LGTM.
Problem
Current thread pool is shared among all ranks. Thus, any thread in the pool may perform send/recv in lieu of any other rank in the host. This has several problems:
EADDRINUSE
errors when the main thread and a pool-worker thread try to recv from the same rank (into the same rank as well).send/recv
are very lightweight methods. One could argue that running them in separate threads is an overkill.In line 327 here I introduce a test that breaks the current semantics.
Solution
Remove the thread pool. Keep track of async messages that we expected to receive per
(sendRank, recvRank)
and perform batch reads in blocking calls.Rely on ZeroMQ's asyncrhonous API. In particular, on thepoll
call, that blocks until it has received at least one message (but does not actually call recv). I.e. it polls the underlying receive buffers.After having thought through it, I think we don't need polling (which is used to multiplex input sockets) but just
recv
-ing enough times according to our protocol.We assume that the MPI application expects to
recv
messages in the order it issuesrecv/irecv
s. This is, if the application issuesirecv
,irecv
, andrecv
, then the 3rd message entering our network buffer will correspond to therecv
, and not to any of theirecv
even in this case:This is the case for OpenMPI tag-less (tested), note that we don't support tags.
Detail on the implementation
We introduce a thread local unacknowledged message buffer (UMB). There is one per rank-to-rank channel local to the host. Each host has
localRanks * worldSize
such channels.It stores messages that have been received, but not claimed, or claimed but not received.
The struct prototype is:
There are three methods that modify UMB:
recv -> msg
,irecv -> id
,await(id) -> msg
. All are executed by the same thread so we need no locks. I include the pseudo-code for each:irecv
: receive a message asynchronously.recv
: recv a message synchronously (block until we receive it)await(id)
: wait untilirecv
withid
has finished (i.e. msg has arrived)Lastly, here's the auxiliary method to poll until a certain number of messages have arrived our inbound network sockets (ZeroMQ's):
Possible pitfalls
recv
be stuck behind airecv
that never appears?irecv
sends need to be done before therecv
send, ifrecv
happens after thoseirecv
.